import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics.pairwise import euclidean_distances
import numpy as np
import pandas as pd
maxlen = 3
max_features = 1000
Let's prepare the encoding that the Keras dataloader uses, so we can encode input, and reverse the output:
word_to_id = keras.datasets.imdb.get_word_index()
word_to_id = {k:(v+3) for k,v in word_to_id.items()}
word_to_id["<PAD>"] = 0
word_to_id["<START>"] = 1
word_to_id["<UNK>"] = 2
word_to_id["<UNUSED>"] = 3
id_to_word = {value:key for key,value in word_to_id.items()}
We load the data nad preprocess it so the LSTMs can process it. This also handles the padding, in case a review is shorter than the defined sequence length.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.imdb.load_data(num_words=max_features)
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=maxlen)
x_train.shape
x_train[0]
all = []
for it in range(x_train.shape[0]):
row = np.zeros((maxlen, max_features))
for jt in range(maxlen):
row[jt, x_train[it, jt]] = 1
all.append(row * 1.0)
data_enc = np.array(all)
data_enc.shape
np.argmax(data_enc[0], axis=1).tolist()
inputs = keras.Input(shape=(None,), dtype="int32")
x = layers.Embedding(max_features, 128)(inputs)
x = layers.Bidirectional(layers.LSTM(128))(x)
x = layers.BatchNormalization()(x)
encoded = layers.Dense(3)(x)
x = layers.Dense(3)(layers.RepeatVector(maxlen)(encoded))
x = layers.BatchNormalization()(x)
x = layers.Bidirectional(layers.LSTM(128, return_sequences=True))(x)
decoded = layers.TimeDistributed(layers.Dense(max_features))(x)
decoded = layers.Softmax(name="decoded")(decoded)
sentiment = layers.Dense(1)(x[:, -1])
sentiment = layers.Activation('sigmoid', name="sentiment")(sentiment)
model = keras.Model(inputs, [decoded, sentiment])
model.summary()
encoder = keras.Model(inputs, encoded)
decoder = keras.Model(inputs, decoded)
model.compile(optimizer='adam', loss={'decoded': 'categorical_crossentropy', 'sentiment': 'binary_crossentropy'})
model.fit(x_train, {'decoded': data_enc, 'sentiment': y_train}, epochs=50)
res = model.predict(x_test)
res[0].shape, res[1].shape
import plotly.graph_objects as go
enc = encoder.predict(x_train)
enc[0]
plot_data = [[], []]
for it in range(y_train.shape[0]):
plot_data[y_train[it]].append(enc[it])
fig = go.Figure([
go.Scatter3d(
x=np.array(plot_data[0])[:, 0],
y=np.array(plot_data[0])[:, 1],
z=np.array(plot_data[0])[:, 2],
mode='markers',
marker={'size': 1},
),
go.Scatter3d(
x=np.array(plot_data[1])[:, 0],
y=np.array(plot_data[1])[:, 1],
z=np.array(plot_data[1])[:, 2],
mode='markers',
marker={'size': 1},
)
])
fig.write_html('plot_sentiment.html')
fig.show()
See the attached HTML file to explore the plot. When exporting Plotly usually does not work anymore.
First we create the input sequence that we want to run the model against.
testing = [word_to_id[word] for word in ['best', 'movie', 'ever']]
testing
Let's run the encoder and see where our sequence falls in the latent space.
testing_space = encoder.predict(np.array([testing]))
testing_space
We can use the euclidean distance to figure out which dataset in the latent space is closest to what we just used as an input sequence. After we sort the array by its distances we should see the closest ones appear at the top:
full_distances = euclidean_distances(np.array([testing_space[0]]), enc)
distances = np.stack(
[
full_distances.reshape(25000, 1),
np.array([[it] for it in range(25000)]),
],
axis=1
).reshape(25000, 2).tolist()
distances = sorted(distances, key=lambda x: x[0])
distances[:10]
Now we can display all the queries that the model encoded as "similar" to our input query.
for dist, key in distances[:20]:
key = int(key)
print(key, dist, [id_to_word[idx] for idx in x_train[key]])